package com.atguigu.flinksql;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class Flink13_Window_Over {
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",10000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(2);

        DataStream<WaterSensor> waterSensorStream =
                env
                        .fromElements(new WaterSensor("sensor_1", 1001L, 10),
                                new WaterSensor("sensor_1", 4000L, 20),
                                new WaterSensor("sensor_1", 4000L, 40),
                                new WaterSensor("sensor_1", 5000L, 50),
                                new WaterSensor("sensor_2", 6000L, 30),
                                new WaterSensor("sensor_2", 7000L, 60))
                        .assignTimestampsAndWatermarks(
                                WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
                        );

        //1、创建表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2、创建动态表
        Table table = tEnv.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

        //3、定义开窗函数
        //这里使用tableAPI定义一个开窗函数
        OverWindow w = Over
                .partitionBy($("id"))
                .orderBy($("ts"))
                .preceding(Expressions.UNBOUNDED_RANGE)
                .following(Expressions.CURRENT_RANGE)
                .as("w");

        //4、对表进行查询
        table.window(w)
                .select($("id"),$("ts"),$("vc"),$("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
